from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime
from enum import IntEnum
from importlib import resources
from typing import Callable, Dict, List, Optional, Tuple, Union

import aspectlib
import numpy as np
import pandas as pd
import parse
from easydict import EasyDict
from ruamel import yaml

from .trader.trader import Trader
from .vlogger import VLogger


class TradeOp(IntEnum):
    HOLD = 0
    BUY = 1
    SELL = 2
    SKIP = 3
    OPEN = 4
    CLOSE = 5
    SKIP_BUY = 6
    SKIP_SELL = 7


# 每格大小为step，中值为mid
def grid_by_mid_step(mid: float, step: float, num_grid: int):
    assert num_grid % 2 == 0
    lower_bound = mid - step * (num_grid / 2 - 1)
    upper_bound = mid + step * (num_grid / 2)
    return grid_by_lh(lower_bound, upper_bound, num_grid)


# 每格大小为percent，中值为mid
def grid_by_mid_step_percent(mid: float, percent: float, num_grid: int):
    assert num_grid % 2 == 0
    grid: deque = deque()
    for mul in range(0, num_grid // 2):
        grid.append(mid * np.power((1 + percent), mul + 1))
        grid.appendleft(mid * np.power((1 - percent), mul))
    return list(grid)


def grid_by_lh(lower_bound: float, upper_bound: float, num_grid: int):
    grid = np.linspace(lower_bound, upper_bound, num_grid)
    return grid.tolist()


@aspectlib.Aspect
def log_signal(*args):
    result = yield aspectlib.Proceed
    stself = args[0]
    stself.signals.append(result)
    yield aspectlib.Return(result)


def log_signal2(impl):

    def wrapper(*args):
        ret = impl(*args)
        st_self = args[0]
        st_self.signals.append(ret)
        return ret

    return wrapper


class TradeStrategy(ABC):

    def __init__(self, description: str, trader: Union[None, Trader] = None):
        self.description = description
        self.signals = []
        self._v = 0
        self._trader = trader

    def set_v(self, v: int):
        self._v = v

    @abstractmethod
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        pass

    @abstractmethod
    def init(self, context):
        pass

    @abstractmethod
    def handle_bar(self, context, bar_dict):
        pass


class GridStrategy(TradeStrategy, ABC):

    def __init__(self, description: str, grid: List[float], order_book_id: str,
                 lots_per_grid: int):
        super().__init__(description)
        self.grid = grid
        self.order_book_id = order_book_id
        self.signal: Union[pd.Series, None] = None
        self.lots = 0
        self._lots_per_grid = lots_per_grid

    @property
    def grid(self):
        return self._grid

    @grid.setter
    def grid(self, value: List[float]):
        self._grid = value

    def grid_range(self, grid_id: Optional[int] = None) -> List[float]:
        if grid_id is None:
            return self.grid
        else:
            if grid_id == 0:
                return [np.NINF, self.grid[0]]
            elif grid_id == len(self.grid):
                return [self.grid[-1], np.Inf]
            else:
                return [self.grid[grid_id - 1], self.grid[grid_id]]

    @property
    def signal(self):
        return self._signal

    @signal.setter
    def signal(self, value: pd.Series):
        self._signal = value

    def __str__(self):
        return '网格大小:{0}，网格:{1}'.format(len(self.grid), self.grid)

    def init_state(self, signal: pd.Series, lots: int):
        self.signal = signal
        self.lots = lots

    def grid_lots(self, grid_id: int) -> int:
        return (len(self.grid) - grid_id) * self._lots_per_grid

    def grid_id(self, price: float) -> int:
        return int(np.searchsorted(self.grid, price))

    @staticmethod
    def make_signal(df: pd.DataFrame, signal: TradeOp, grid_id: int, op: str,
                    price: float):
        result = df.copy()
        result['signal'] = signal
        result['grid_id'] = grid_id
        result['op'] = op
        result['price'] = price
        return result

    def init(self, context):
        self._trader.subscribe(self.order_book_id, '1d')
        VLogger.vlog(
            0,
            'Backtesting %s on %s...' % (self.description, self.order_book_id))

    def order_by_signal(self, context, signal: pd.Series):
        self._trader.order_target_percent(context,
                                          self.order_book_id, 1 -
                                          float(signal.grid_id) /
                                          len(self.grid),
                                          signal.price)

    def handle_bar(self, context, bar_dict):
        bar = bar_dict[self.order_book_id]
        data = pd.DataFrame.from_records([{
            'order_book_id': self.order_book_id,
            'datetime': context.now,
            'open': bar.open,
            'low': bar.low,
            'high': bar.high,
            'close': bar.close,
        }],
            index=['datetime', 'order_book_id'])
        signal = self.generate_signal(data, context.now).iloc[0]
        if signal.signal in [
                TradeOp.OPEN, TradeOp.CLOSE, TradeOp.BUY, TradeOp.SELL
        ]:
            self.order_by_signal(context, signal)


# 最基础的网格，只看当前区间决定买卖信号。
# 基础网格适用于中长线（例如按照日线进行），不适合短线（日内）或者根据分钟线数据进行交易
# 因为日内交易波动区间较小，可能在网格区间反复穿越。
class BasicGrid(GridStrategy):

    def __init__(self, grid: List[float], order_book_id: str, lots_per_grid=1):
        super().__init__('基础网格', grid, order_book_id, lots_per_grid)

    @log_signal
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert len(df) == 1
        instrument = df.iloc[0]
        grid_id = self.grid_id(instrument.open)
        if self.signal is None:
            op = '建仓:%s' % grid_id
            s = TradeOp.OPEN
        elif self.signal.grid_id == grid_id:
            op = '格内震荡: %s' % grid_id
            s = TradeOp.HOLD
        elif grid_id < self.signal.grid_id:
            op = '加仓:%s->%s' % (self.signal.grid_id, grid_id)
            s = TradeOp.BUY
        else:
            op = '减仓:%s->%s' % (self.signal.grid_id, grid_id)
            s = TradeOp.SELL
        result = self.make_signal(df, s, grid_id, op, instrument.open)
        self.signal = result.iloc[0]
        return result


# 增强网格，如果两次买卖之间的差价小于阈值，不触发网格操作。
# 目的是避免反复穿越网格线造成频繁交易白交手续费
class EnhancedBasicGrid(GridStrategy):

    def __init__(self,
                 grid: List[float],
                 order_book_id: str,
                 threshold: float,
                 lots_per_grid=1):
        super().__init__('增强网格', grid, order_book_id, lots_per_grid)
        self._threshold = threshold

    @log_signal
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert len(df) == 1
        instrument = df.iloc[0]
        grid_id = self.grid_id(instrument.open)
        if self.signal is None:
            op = '建仓:%s，%s' % (grid_id, self.grid_range(grid_id))
            s = TradeOp.OPEN
        elif self.signal.grid_id == grid_id:
            price_diff = instrument.open - self.signal.price
            gr = self.grid_range(grid_id)
            op = '格内震荡: {}，[{:.2f}, {:.2f}], 价差: {:.2f} ({:.2%})'.format(
                grid_id, gr[0], gr[1], price_diff,
                price_diff / self.signal.price)
            s = TradeOp.HOLD
        else:
            if self.signal.grid_id == 0:
                threshold = (self.grid[1] - self.grid[0])
            elif self.signal.grid_id == len(self.grid):
                threshold = (self.grid[-1] - self.grid[-2])
            else:
                threshold = self.grid[self.signal.grid_id -
                                      1] - self.grid[self.signal.grid_id - 2]
            threshold *= self._threshold
            price_diff = instrument.open - self.signal.price
            if abs(price_diff) < threshold:
                op = '忽略信号:{0}->{1}，价格差abs({2:.2f} - {3:.2f}) = {4:.2f}({5:.2%})小于阈值{6:.2f}'.format(
                    self.signal.grid_id, grid_id, self.signal.price,
                    instrument.open, abs(price_diff),
                    price_diff / self.signal.price, threshold)
                s = TradeOp.SKIP
            else:
                if grid_id < self.signal.grid_id:
                    op = '加仓:{}->{}，{}, 价差: {:.2f} ({:.2%})'.format(
                        self.signal.grid_id, grid_id,
                        str(self.grid_range(grid_id)), price_diff,
                        price_diff / self.signal.price)
                    s = TradeOp.BUY
                else:
                    op = '减仓:{}->{}，{}, 价差: {:.2f} ({:.2%})'.format(
                        self.signal.grid_id, grid_id,
                        str(self.grid_range(grid_id)), price_diff,
                        price_diff / self.signal.price)
                    s = TradeOp.SELL

        result = self.make_signal(df, s, grid_id, op, instrument.open)
        if s in [TradeOp.OPEN, TradeOp.CLOSE, TradeOp.BUY, TradeOp.SELL]:
            self.signal = result.iloc[0]
        return result


# 拐点卖出条件单
class TurningPointSell(TradeStrategy):

    def __init__(self, order_book_id: str, dt: datetime, base: float,
                 cfg: EasyDict):
        super().__init__('拐点卖出条件单')
        self._order_book_id = order_book_id
        self._base = base
        self._tp = base
        self._state = TradeOp.OPEN
        self._state_history = [
            EasyDict({
                'datetime': dt,
                'signal': self._state,
                'op': '建仓',
                'base': self._base,
                'tp': self._tp,
                'price': self._base,
                'latest': self._base,
            })
        ]
        self._cfg = cfg

    def explain(self):
        latest = self._state_history[-1]
        VLogger.info('{}: increase_rt: {:2%}'.format(
            self._order_book_id, latest.latest / latest.base - 1))
        for state in self._state_history:
            VLogger.info(
                '{}: {}, base: {}, tp: {}, price: {}, increase_rt: {:2%}'.
                format(state.datetime, state.op, state.base, state.tp,
                       state.price, state.price / state.base - 1))

    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert len(df) == 1
        bar = df.iloc[0]

        # 不亏本卖出
        if bar.open < self._base:
            self._state = TradeOp.HOLD
            op = '埋伏'
        else:
            if self._state == TradeOp.SKIP_SELL:
                # 拐点卖出已经触发
                if bar.open > self._tp:
                    self._tp = bar.open
                    op = '持仓: 新高'
                elif bar.open < self._tp * (1 - self._cfg.drop_threshold):
                    self._state = TradeOp.CLOSE
                    op = '卖出: 高点回落超{:.2%}'.format(self._cfg.drop_threshold)
                else:
                    op = '持仓: 高点回落震荡'
            else:
                if bar.day_open / bar.open - 1 > self._cfg.daily_increase_threshold:
                    self._tp = bar.open
                    self._state = TradeOp.SKIP_SELL
                    op = '准备卖出: 日内拉升超{:.2%}: {} -> {}'.format(
                        self._cfg.daily_increase_threshold, bar.day_open, bar.open)
                elif bar.open > self._base * (1 + self._cfg.cummulated_increase_threshold):
                    self._tp = bar.open
                    self._state = TradeOp.SKIP_SELL
                    op = '准备卖出: 累计涨幅超{:.2%}: {} -> {}'.format(
                        self._cfg.cummulated_increase_threshold, self._base, bar.open)
                else:
                    self._state = TradeOp.HOLD
                    op = '持仓: 埋伏'

        if self._state_history[-1].op != op:
            self._state_history.append(
                EasyDict({
                    'datetime': now,
                    'signal': self._state,
                    'op': op,
                    'base': self._base,
                    'tp': self._tp,
                    'price': bar.open,
                    'latest': bar.open,
                }))
        else:
            self._state_history[-1].latest = bar.open

        signal = pd.DataFrame.from_records([{
            'order_book_id': self._order_book_id,
            'signal': self._state,
            'op': op,
            'base': self._base,
            'tp': self._tp,
        }])
        return df.reset_index().merge(signal, on=['order_book_id'])

    def init(self, context):
        pass

    def handle_bar(self, context, bar_dict):
        pass


# 拐点买入条件单
class TurningPointBuy(TradeStrategy):

    def __init__(self, threshold: float):
        super().__init__('拐点买入条件单')
        self._base: Union[pd.Series, None] = None
        self._threshold = threshold

    def init(self, context):
        pass

    def handle_bar(self, context, bar_dict):
        pass

    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert len(df) == 1
        instrument = df.iloc[0]
        if self._base is None:
            self._base = instrument
            s = TradeOp.SKIP_BUY
            op = '等待加仓'
        elif instrument.open < self._base.open:
            self._base = instrument
            s = TradeOp.SKIP_BUY
            op = '新低'
        elif instrument.open - self._base.open < self._threshold:
            s = TradeOp.SKIP_BUY
            op = '低点反弹'
        else:
            s = TradeOp.BUY
            op = '加仓'

        signal = pd.Series({
            'order_book_id': instrument.name,
            'signal': s,
            'op': op,
            'price': instrument.open,
            'tp': self._base.open,
        })
        return df.join(
            signal.to_frame().transpose().set_index('order_book_id'))


# 拐点网格
# 更适用与根据分钟线进行交易
class DynamicTurningPointGrid(GridStrategy):

    def __init__(self,
                 grid: List[float],
                 order_book_id: str,
                 percent: float,
                 bounce_turning_threshold: float,
                 drop_turning_threshold: float,
                 lots_per_grid: int = 1):
        super().__init__('动态拐点网格', grid, order_book_id, lots_per_grid)
        self._pct = percent
        self._bounce_turning_threshold = bounce_turning_threshold
        self._drop_turning_threshold = drop_turning_threshold
        self._tp: Union[float, None] = None
        self._base: Union[float, None] = None

    @log_signal
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert len(df) == 1
        instrument = df.iloc[0]
        if self.signal is None:
            s = TradeOp.OPEN
            grid_id = self.grid_id(instrument.open)
            op = '建仓:{0:.2f}'.format(instrument.open)
            price = instrument.open
            self._base = price
        elif self.signal.signal == TradeOp.SKIP_SELL:
            # SKIP_SELL:卖出已触发
            if instrument.open > self._tp:
                s = TradeOp.SKIP_SELL
                grid_id = self.signal.grid_id
                op = '新高:{0:.2f}, 前高:{1:.2f}, 涨幅:{2:.2%}'.format(
                    instrument.open, self._tp,
                    instrument.open / self._base - 1)
                price = self.signal.price
                self._tp = instrument.open
            elif instrument.open < self._tp * (1 -
                                               self._drop_turning_threshold):
                grid_inc = int(
                    (self._tp - self._base) / self._base / self._pct)
                if not grid_inc > 0:
                    VLogger.error('tp: %s, base: %s, pct: %s' %
                                  (self._tp, self._base, self._pct))
                    raise Exception('fail')
                # 穿越多格穿网
                if self.signal.grid_id + grid_inc > len(self.grid):
                    grid_inc = len(self.grid) - self.signal.grid_id
                s = TradeOp.SELL
                grid_id = self.signal.grid_id + grid_inc
                op = '回落卖出:高点:{0:.2f}, 现价:{1:.2f}, 跌幅:{2:.2%}, 卖出{3}格'.format(
                    self._tp, instrument.open,
                    1 - (instrument.open / self._tp), grid_inc)
                price = instrument.open
                self._base = price
            else:
                s = TradeOp.SKIP_SELL
                grid_id = self.signal.grid_id
                op = '高点回落:前高:{0:.2f}, 现价:{1:.2f}, 跌幅:{2:.2%}'.format(
                    self._tp, instrument.open,
                    1 - (instrument.open / self._tp))
                price = self.signal.price
        elif self.signal.signal == TradeOp.SKIP_BUY:
            if instrument.open < self._tp:
                s = TradeOp.SKIP_BUY
                grid_id = self.signal.grid_id
                gr = self.grid_range(grid_id)
                op = '新低:{0:.2f}, 前低:{1:.2f}, 跌幅:{2:.2%}'.format(
                    instrument.open, self._tp, 1 - instrument.open / gr[0])
                self._tp = instrument.open
                price = self.signal.price
            elif instrument.open > self._tp * (1 +
                                               self._bounce_turning_threshold):
                grid_inc = int(
                    (self._base - self._tp) / self._base / self._pct)
                if not grid_inc > 0:
                    VLogger.error('tp: %s, base: %s, pct: %s' %
                                  (self._tp, self._base, self._pct))
                    raise Exception('fail')
                # 穿越多格穿网
                if self.signal.grid_id - grid_inc < 0:
                    grid_inc = self.signal.grid_id
                s = TradeOp.BUY
                grid_id = self.signal.grid_id - grid_inc
                op = '反弹买入:低点:{0:.2f}, 现价:{1:.2f}, 涨幅:{2:.2%}, 买入{3}格'.format(
                    self._tp, instrument.open,
                    (instrument.open / self._tp) - 1, grid_inc)
                price = instrument.open
                self._base = price
            else:
                s = TradeOp.SKIP_BUY
                grid_id = self.signal.grid_id
                op = '低点反弹:前低:{0:.2f}, 现价:{1:.2f}, 涨幅:{2:.2%}'.format(
                    self._tp, instrument.open,
                    (instrument.open / self._tp) - 1)
                price = self.signal.price
        elif instrument.open > self._base * (1 + self._pct):
            if self.signal.grid_id == len(self.grid):
                s = TradeOp.HOLD
                grid_id = self.signal.grid_id
                op = '休眠(空仓): 基准: {:.2f}, 现价: {:.2f}, 涨幅: {:.2%}'.format(
                    self._base, instrument.open,
                    instrument.open / self._base - 1)
                price = self.signal.price
            else:
                s = TradeOp.SKIP_SELL
                grid_id = self.signal.grid_id
                op = '触发卖出:基准:{0:.2f}, 现价:{1:.2f}, 涨幅:{2:.2%}'.format(
                    self._base, instrument.open,
                    instrument.open / self._base - 1)
                price = self.signal.price
                self._tp = instrument.open
        elif instrument.open < self._base * (1 - self._pct):
            if self.signal.grid_id == 0:
                s = TradeOp.HOLD
                grid_id = self.signal.grid_id
                op = '休眠(满仓): 基准: {:.2f}, 现价: {:.2f}, 跌幅: {:.2%}'.format(
                    self._base, instrument.open,
                    1 - instrument.open / self._base)
                price = self.signal.price
            else:
                s = TradeOp.SKIP_BUY
                grid_id = self.signal.grid_id
                op = '触发买入:基准:{0:.2f}, 现价:{1:.2f}, 跌幅:{2:.2%}'.format(
                    self._base, instrument.open,
                    self._base / instrument.open - 1)
                price = self.signal.price
                self._tp = instrument.open
        else:
            s = TradeOp.HOLD
            grid_id = self.signal.grid_id
            price = self.signal.price
            op = '格内震荡:基准:{:.2f}, 网格:[{:.2f}, {:.2f}], 现价:{:.2f}, 距离基准:{:.2%}'.format(
                self._base,
                self._base * (1 - self._pct) if grid_id > 0 else np.NINF,
                self._base *
                (1 + self._pct) if grid_id < len(self.grid) else np.Inf,
                instrument.open, instrument.open / self._base - 1)

        result = self.make_signal(df, s, grid_id, op, price)
        result['base'] = self._base
        self.signal = result.iloc[0]
        return result

    def init_state(self, signal: pd.Series, lots: int):
        super().init_state(signal, lots)
        if signal is not None:
            self._base = signal.price
        else:
            self._base = None


# 转债轮动策略
class ConbondStrategy(TradeStrategy, ABC):
    def __init__(self,
                 cfg: EasyDict,
                 conbond_data_fn: Union[None, Callable] = None, trader=None):
        self.cfg = cfg
        description = self.cfg.description.format(**self.cfg.vars_dict)
        super().__init__(description, trader)
        for key in ['filters', 'evals']:
            if key in self.cfg:
                values = {}
                for reason, expr in self.cfg[key].items():
                    reason = reason.format(**self.cfg.vars_dict)
                    values[reason] = expr.format(**self.cfg.vars_dict)
                self.cfg[key] = values
            else:
                self.cfg[key] = {}
        self._CACHE = {}
        self._filter_fields = set()
        self._eval_fields = set()
        self._last_eval: Union[None, str] = None
        self._conbond_data_fn = conbond_data_fn

    @staticmethod
    def strategy_cfg(name: str, vars_dict: Dict) -> EasyDict:
        with resources.path('quant', 'conbond.yml') as f:
            with open(f, mode='r', encoding='UTF-8') as fp:
                strategies = yaml.safe_load(fp)
        cfg = EasyDict(strategies[name])
        cfg.vars_dict.update(vars_dict)
        return cfg

    def eval_fn(self, now: datetime, expr: str, bond: pd.Series) -> bool:
        values = dict(bond.to_dict())
        stmt = expr.format(**values)
        try:
            return eval(stmt)
        except Exception as e:
            VLogger.error('stmt: %s, values: %s' % (stmt, values))
            raise e

    def fields_used(self, row: pd.Series, section: Dict, now: datetime):
        values = row.to_dict()
        values['now'] = now
        fields_used = set()
        for col, expr in section.items():
            try:
                stmt = expr.format(**values)
            except Exception as e:
                VLogger.error(values)
                raise e
            fields_used.update(set(parse.parse(expr, stmt).named.keys()))
            values[col] = col
        return fields_used - {'now'}

    def filter(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        if len(self._filter_fields) == 0:
            VLogger.vlog(3, self.cfg.filters)
            self._filter_fields = self.fields_used(
                df.head(1).reset_index().iloc[0], self.cfg.filters, now)

        dfs = []
        for reason, expr in self.cfg.filters.items():
            expr_dict = {f: 'df.{}'.format(f) for f in self._filter_fields}
            expr = expr.format(**expr_dict)
            try:
                fdf = df[eval(expr)].copy()
                fdf['filtered'] = reason
                df.drop(fdf.index, inplace=True)
                dfs.append(fdf)
            except Exception as e:
                VLogger.error(expr)
                raise e
        return pd.concat(dfs)

    def eval(self, df: pd.DataFrame, now: datetime) -> Tuple[pd.DataFrame, pd.DataFrame]:
        if len(self._eval_fields) == 0:
            VLogger.vlog(3, self.cfg.evals)
            self._eval_fields = self.fields_used(
                df.head(1).reset_index().iloc[0], self.cfg.evals, now)

        def rank(df_in: pd.Series) -> pd.Series:
            return df_in.rank()

        def ma(df_in: pd.Series, days: int) -> pd.Series:
            return df_in.rolling(days).mean()

        def days_from_now(df_in: pd.Series) -> int:
            return (df_in.dt.date - now.date()).dt.days

        result = df.copy()
        self._last_eval = None
        for col, expr in self.cfg.evals.items():
            expr_dict = {f: 'result.{}'.format(f) for f in self._eval_fields}
            expr_dict['now'] = now
            expr = expr.format(**expr_dict)
            try:
                temp = eval(expr)
                result[col] = temp
                self._last_eval = col
            except Exception as e:
                VLogger.error(expr)
                raise e
        assert self._last_eval is not None
        return result

    def init(self, context):
        pass

    def handle_bar(self, context, bar_dict):
        pass

    @log_signal
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        assert not df.empty
        unfiltered, filtered = self._generate_signal(df, now)
        signals = unfiltered.nsmallest(self.cfg.vars_dict.top, self._last_eval)
        assert not signals.empty
        if VLogger.v >= 2:
            self._explain(signals, filtered, df)
        return signals

    def _explain(self, signals: pd.DataFrame, filtered: pd.DataFrame,
                 df: pd.DataFrame):
        from tabulate import tabulate
        vsignals = signals.reset_index().drop(
            columns=['datetime']).set_index('order_book_id')
        VLogger.vlog(2, 'Filtered:')
        VLogger.vlog(
            2,
            '\n' + tabulate(filtered.reset_index().drop(columns=['datetime']).
                            set_index('order_book_id').sort_values('filtered'),
                            headers='keys',
                            tablefmt='fancy_grid'))
        VLogger.vlog(2, 'Filtering conditions:')
        cond = pd.DataFrame.from_dict({
            'reason': self.cfg.filters.keys(),
            'condition': self.cfg.filters.values()
        }).set_index('reason')
        VLogger.vlog(
            2, '\n' + tabulate(cond, headers='keys', tablefmt='fancy_grid'))
        VLogger.vlog(
            2, 'Top {} candidates from {}:'.format(self.cfg.vars_dict.top,
                                                   self.description))
        VLogger.vlog(
            2,
            '\n' + tabulate(vsignals, headers='keys', tablefmt='fancy_grid'))

        if 'increase_rt' in df.columns:
            vsignals = vsignals.join(df[['increase_rt']])
            VLogger.vlog(
                2, 'Performance of top {} candidates today: {:.2%}'.format(
                    self.cfg.vars_dict.top,
                    vsignals.increase_rt.mean() / 100))
            VLogger.vlog(
                2, 'Performance of all {} conbonds today: {:.2%}'.format(
                    len(df),
                    df.increase_rt.mean() / 100))

    def _generate_signal(self, df: pd.DataFrame,
                         now: datetime) -> Tuple[pd.DataFrame, pd.DataFrame]:
        # Eval first, then apply filters
        df_eval = self.eval(df, now)
        assert not df_eval.empty

        filtered = self.filter(df_eval, now)
        unfiltered = df_eval[pd.isna(
            df_eval.join(filtered['filtered']).filtered)]
        assert not unfiltered.empty, filtered['filtered']
        unfiltered['filtered'] = None

        fields = {'symbol', 'filtered', self._last_eval}
        if 'industry' in df.columns:
            fields.add('industry')
        fields = fields.union(self._filter_fields).union(self._eval_fields)

        return unfiltered[list(fields)], filtered[list(fields)]

class ConbondRotateStrategy(ConbondStrategy, ABC):
    def __init__(self,
                 cfg: EasyDict,
                 conbond_data_fn: Union[None, Callable] = None, trader=None):
        super().__init__(cfg, conbond_data_fn, trader)

    def init(self, context):
        VLogger.vlog(0, 'Strategy: %s' % self.description)
        if self.cfg.vars_dict.rebalance == '周':
            VLogger.vlog(1, 'Run weekly...')
            self._trader.run_weekly(self.rebalance, tradingday=1)
        elif self.cfg.vars_dict.rebalance == '日':
            VLogger.vlog(1, 'Run daily...')
            self._trader.run_daily(self.rebalance, '09:35:00')
        elif self.cfg.vars_dict.rebalance == '月':
            VLogger.vlog(1, 'Run monthly...')
            self._trader.run_monthly(self.rebalance, tradingday=1)
        else:
            raise Exception('invalid schedule: %s' %
                            self.cfg.vars_dict.rebalance)

    def handle_bar(self, context, bar_dict):
        pass

    def rebalance(self, context, bar_dict=None):
        dt = context.now
        VLogger.vlog(1,
                     '{}: ********** {} **********'.format(datetime.now(), dt))
        bars = self._conbond_data_fn(context, bar_dict)
        VLogger.vlog(3, bars)
        signal = self.generate_signal(
            bars, dt).reset_index().set_index('order_book_id')
        bars = bars.reset_index().set_index('order_book_id')

        top_candidates = set(signal.index.tolist())
        positions = set(self._trader.get_positions(context))
        to_close = (positions - top_candidates)
        suspended = set()
        suspended_value = 0

        VLogger.vlog(1, '市值: {}'.format(self._trader.nav(context)))
        for order_book_id in to_close:
            p = self._trader.get_position(context, order_book_id)
            if order_book_id in bars.index:
                orders = self._trader.order_target_volume(
                    context, order_book_id, 0, bars.loc[order_book_id].open)
                for order in orders:
                    VLogger.vlog(
                        1,
                        '{}: 卖出{}: {}张, 价格{}'.format(context.now,
                                                     order_book_id, p.volume,
                                                     order.price))
            else:
                suspended.add(order_book_id)
                suspended_value += p.market_value
                VLogger.vlog(
                    1, '%s: %s: Not closing %s as it is suspended.' %
                    (dt, self.description, order_book_id))

        candidates = set(
            signal.head(len(top_candidates) - len(suspended)).index.tolist())
        value = self._trader.nav(context) - suspended_value
        target_value = 0.98 * value / len(candidates)
        VLogger.vlog(
            1,
            '可用: {}, 停牌价值: {}, 单个标的目标价值: {}, '.format(value, suspended_value,
                                                      target_value))
        to_open = candidates - positions
        to_rebalance = candidates - to_open
        for order_book_id in to_rebalance:
            price = bars.loc[order_book_id].open
            target_volume = int(target_value / price)
            p = self._trader.get_position(context, order_book_id)
            assert p is not None
            assert p.volume % 10 == 0
            if abs(target_volume - p.volume) > 10:
                target_volume -= target_volume % 10
                assert target_volume > 0
                orders = self._trader.order_target_volume(
                    context, order_book_id, target_volume, price)
                for order in orders:
                    VLogger.vlog(
                        1, '{}: 再平衡{}: {}张, 价格{}, 共{}张, 价值{}'.format(
                            context.now, order_book_id,
                            target_volume - p.volume, order.price,
                            target_volume, target_volume * order.price))
            else:
                VLogger.vlog(
                    1,
                    '{}: 持仓不动: {}, {}张, 价值{}'.format(context.now,
                                                     order_book_id, p.volume,
                                                     p.market_value))

        for order_book_id in to_open:
            price = bars.loc[order_book_id].open
            target_volume = int(target_value / price)
            target_volume -= target_volume % 10
            assert target_volume > 0
            orders = self._trader.order_target_volume(context, order_book_id,
                                                      target_volume, price)
            for order in orders:
                VLogger.vlog(
                    1, '{}: 建仓{}: {}张, 价格{}, 价值{}'.format(
                        context.now, order_book_id, target_volume, order.price,
                        target_volume * order.price))

        VLogger.vlog(1, 'Rebalance done\n')


class Benchmark(TradeStrategy, ABC):

    def __init__(self, description, order_book_id):
        super().__init__(description)
        self.order_book_id = order_book_id

    @log_signal
    def generate_signal(self, df: pd.DataFrame, now: datetime) -> pd.DataFrame:
        pass

    def init(self, context):
        VLogger.vlog(
            0, 'Backtesting benchmark %s(%s)...' %
            (self.description, self.order_book_id))

    def handle_bar(self, context, bar_dict):
        p = self._trader.get_position(context, self.order_book_id)
        if p is None:
            self._trader.order_target_percent(context, self.order_book_id, 1)
